# -*- coding: utf-8 -*-
from datetime import timedelta
from time import time
from utils.operators.rich_sql_sensor import RichSqlSensor

from jms.dm.dm_cn_load_after_departure_detail_dt import jms_dm__dm_cn_load_after_departure_detail_dt

label = f"dwd_cn_load_after_departure_detail_dt_{int(time())}"
timeout = timedelta(minutes=30).seconds

doris_jms_dwd__dwd_cn_load_after_departure_detail_dt = RichSqlSensor(
    task_id='doris_jms_dwd__dwd_cn_load_after_departure_detail_dt',
    pool='broker_load_pool',
    email=['lukunming@jtexpress.com','yl_bigdata@yl-scm.com'],
    conn_id='doris',
    pre_sql=f"""
             TRUNCATE TABLE jms_dws.dwd_cn_load_after_departure_detail_dt PARTITION (
                p{{{{ execution_date | cst_ds_nodash }}}}
                ,p{{{{ execution_date | date_add(-1) | cst_ds_nodash }}}}
             );
             LOAD LABEL jms_dws.{label} (
                 DATA INFILE(
                     "hdfs://{{{{var.value.hadoop_namespace}}}}/dw/hive/jms_dm.db/external/dm_cn_load_after_departure_detail_dt/dt={{{{ execution_date | cst_ds }}}}/*"
                    ,"hdfs://{{{{var.value.hadoop_namespace}}}}/dw/hive/jms_dm.db/external/dm_cn_load_after_departure_detail_dt/dt={{{{ execution_date | date_add(-1) | cst_ds }}}}/*"
                 )
                 INTO TABLE dwd_cn_load_after_departure_detail_dt
                 FORMAT AS 'PARQUET'
             )
             WITH BROKER '{{{{var.json.doris_brokers | random_choice}}}}'
             PROPERTIES ('timeout'='{timeout}', 'max_filter_ratio'='0.0')""",
    poke_sql=f"SHOW LOAD FROM jms_dws WHERE label = '{label}' ORDER BY CreateTime DESC LIMIT 1",
    sql_on_kill=f"CANCEL LOAD FROM jms_dws WHERE LABEL = '{label}'",
    success=lambda r: (r[2] == 'FINISHED' or (r[2] == 'CANCELLED' and "No source file in this table" in r[7])),
    failure=lambda r: (r[2] is not None and r[2] == 'CANCELLED' and "No source file in this table" not in r[7], str(r[7])),
    poke_interval=60,
    execution_timeout=timedelta(seconds=timeout + 120), )

doris_jms_dwd__dwd_cn_load_after_departure_detail_dt << jms_dm__dm_cn_load_after_departure_detail_dt
